package org.jgroups.protocols;

import org.jgroups.*;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AsciiString;
import org.jgroups.util.Buffer;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;

import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateException;
import java.security.spec.X509EncodedKeySpec;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ENCRYPT layer. Encrypt and decrypt communication in JGroups
 * 
 * This class can be used in two ways:
 * <ul>
 * <li> Option 1. Configured with a secretKey in a keystore so it can be used at
 * any layer in JGroups without the need for a coordinator, or if you want
 * protection against passive monitoring but do not want the key exchange
 * overhead and complexity. In this mode all nodes must be distributed with the
 * same keystore file.
 * <li> Option 2. Configured with algorithms and key sizes. The ENCRYPT layer in
 * this mode sould be placed above the GMS protocol in the configuration. The
 * coordinator then chooses the secretkey which it distributes amongst all the
 * peers. In this form, no keystore exists as the keys are distributed using a
 * public/private key exchange. View changes that identify a new controller will
 * result in a new session key being generated and then distributed to all
 * peers. This overhead can be substantial in a an application with a reasonable
 * peer churn.
 * </ul>
 * <p>
 * <p>
 * Each message is identified as encrypted with a specific encryption header
 * which identifies the type of encrypt header and an MD5 digest that identifies
 * the version of the key being used to encrypt/decrypt the messages.
 * <p>
 * <p>
 * <h2>Option 1</h2>
 * <br>
 * This is the simplest option and can be used by simply inserting the
 * Encryption layer at any point in the JGroups stack - it will encrypt all
 * Events of a type MSG that have a non-null message buffer. The format of the
 * entry in this form is:<br>
 * &lt;ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit"
 * alias="myKey"/&gt;<br>
 * An example showing the keystore version can be found in
 * the conf in a file called EncryptKeyStore.xml - along with a
 * defaultStore.keystore file.<br>
 * In order to use the ENCRYPT layer in this manner, it is necessary to have the
 * secretKey already generated in a keystore file. The directory containing the
 * keystore file must be on the application's classpath. You cannot create a
 * SecretKey keystore file using the keytool application shipped with the JDK. A
 * java file called KeyStoreGenerator is included in the demo package that can
 * be used from the command line (or IDE) to generate a suitable keystore.
 * <p>
 * <p>
 * <h2>Option 2</h2>
 * <br>
 * This option is suited to an application that does not ship with a known key
 * but instead it is generated and distributed by the controller. The secret key
 * is first generated by the controller (in JGroups terms). When a view change
 * occurs, a peer will request the secret key by sending a key request with its
 * own public key. The controller encrypts the secret key with this key and
 * sends it back to the peer who then decrypts it and installs the key as its
 * own secret key. <br>
 * All encryption and decryption of messages is done using this key. When a peer
 * receives a view change that shows a different keyserver, it will repeat this
 * process - the view change event also trigger the ENCRYPT layer to queue up
 * and down messages until the new key is installed. The previous keys are
 * retained so that messages sent before the view change that are queued can be
 * decrypted if the key is different. <br>
 * An example EncryptNoKeyStore.xml is included in the conf file as a guide.
 * <p>
 * <p>
 * <br>
 * Note: the current version does not support the concept of perfect forward
 * encryption (PFE) which means that if a peer leaves the group the keys are
 * re-generated preventing the departed peer from decrypting future messages if
 * it chooses to listen in on the group. This is not included as it really
 * requires a suitable authentication scheme as well to make this feature useful
 * as there is nothing to stop the peer rejoining and receiving the new key. A
 * future release will address this issue.
 * 
 * @author Steve Woodcock
 * @author Bela Ban
 */
@MBean(description="Protocol which encrypts and decrypts cluster traffic")
public class ENCRYPT extends Protocol {
    private static final String DEFAULT_SYM_ALGO="AES";
    Address local_addr;
    Address keyServerAddr;
    boolean keyServer=false; //used to see whether we are the key server
    
    /* -----------------------------------------    Properties     -------------------------------------------------- */

    // encryption properties in no supplied key mode
    @Property(name="asym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
    String asymProvider=null;

    @Property(name="sym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
    String symProvider=null;

    @Property(name="asym_algorithm", description="Cipher engine transformation for asymmetric algorithm. Default is RSA")
    protected String asymAlgorithm="RSA";

    @Property(name="sym_algorithm", description="Cipher engine transformation for symmetric algorithm. Default is AES")
    String symAlgorithm=DEFAULT_SYM_ALGO;

    @Property(name="asym_init", description="Initial public/private key length. Default is 512")
    int asymInit=512;

    @Property(name="sym_init", description="Initial key length for matching symmetric algorithm. Default is 128")
    int symInit=128;

    @Property(name="change_keys", description="Generate new symmetric keys on every view change. Default is false. " +
      "Set this to true when using asymmetric encryption, to handle merging (JGRP-1907)")
    boolean changeKeysOnViewChange=false;

    // properties for functioning in supplied key mode
    private boolean suppliedKey=false;

    @Property(name="key_store_name", description="File on classpath that contains keystore repository")
    String keyStoreName;

    @Property(name="store_password",
              description="Password used to check the integrity/unlock the keystore. Change the default",
              exposeAsManagedAttribute=false)
    private String storePassword="changeit"; //JDK default

    @Property(name="key_password", description="Password for recovering the key. Change the default",
              exposeAsManagedAttribute=false)
    private String keyPassword=null; // allows to assign keypwd=storepwd if not set (https://issues.jboss.org/browse/JGRP-1375)


    @Property(name="alias", description="Alias used for recovering the key. Change the default",exposeAsManagedAttribute=false)
    private String alias="mykey"; // JDK default

    @Property(description="Number of ciphers in the pool to parallelize encrypt and decrypt requests",writable=false)
    protected int cipher_pool_size=8;


    // public/private Key
    KeyPair Kpair; // to store own's public/private Key

    //	 for client to store server's public Key
    PublicKey serverPubKey=null;

    // Cipher pools used for encryption and decryption. Size is cipher_pool_size
    protected Cipher[] encoding_ciphers, decoding_ciphers;

    // Locks to synchronize access to the cipher pools
    protected Lock[] encoding_locks, decoding_locks;

    protected final AtomicInteger cipher_index=new AtomicInteger(0); // the cipher and lock to select

    // version filed for secret key
    protected byte[] symVersion;

    // dhared secret key to encrypt/decrypt messages
    protected SecretKey secretKey;

    // map to hold previous keys so we can decrypt some earlier messages if we need to
    final Map<AsciiString,Cipher> keyMap=new WeakHashMap<AsciiString,Cipher>();

    // queues to buffer data while we are swapping shared key or obtaining key for first time
    private boolean queue_up=true;
    private boolean queue_down=false;

    // queue to hold upcoming messages while key negotiation is happening
    private BlockingQueue<Message> upMessageQueue=new LinkedBlockingQueue<Message>();

    //	 queue to hold downcoming messages while key negotiation is happening
    private BlockingQueue<Message> downMessageQueue=new LinkedBlockingQueue<Message>();
    // decrypting cypher for secret key requests
    private Cipher asymCipher;

    /** determines whether to encrypt the entire message, or just the buffer */
    @Property
    private boolean encrypt_entire_message=false;




    protected int getNextIndex() {
        // same as mod, but (apparently, I'm told) more efficient. Size needs to be a power ot 2
        int current_index=cipher_index.getAndIncrement();
        return current_index & (cipher_pool_size-1);
    }

    public int       getAsymInit()          {return asymInit;}
    public SecretKey getDesKey()            {return secretKey;}
    public KeyPair   getKpair()             {return Kpair;}
    public Cipher    getAsymCipher()        {return asymCipher;}
    public String    getSymAlgorithm()      {return symAlgorithm;}
    public int       getSymInit()           {return symInit;}
    public String    getAsymAlgorithm()     {return asymAlgorithm;}
    public byte[]    getSymVersion()        {return symVersion;}
    public SecretKey getSecretKey()         {return secretKey;}
    public Cipher    getSymDecodingCipher() {return decoding_ciphers[getNextIndex()];}
    public Cipher    getSymEncodingCipher() {return encoding_ciphers[getNextIndex()];}
    public Address   getKeyServerAddr()     {return keyServerAddr;}
    private void     setSymVersion(byte[] symVersion)        {this.symVersion=Arrays.copyOf(symVersion, symVersion.length);}
    private void     setSecretKey(SecretKey secretKey)       {this.secretKey=secretKey;}
    protected void   setLocalAddress(Address local_addr)     {this.local_addr=local_addr;}
    protected void   setKeyServerAddr(Address keyServerAddr) {this.keyServerAddr=keyServerAddr;}


    /*
      * GetAlgorithm: Get the algorithm name from "algorithm/mode/padding"
      *  taken m original ENCRYPT file
      */
    private static String getAlgorithm(String s) {
        int index=s.indexOf("/");
        if(index == -1)
            return s;

        return s.substring(0, index);
    }



    public void init() throws Exception {
        if(keyPassword == null && storePassword != null) {
            keyPassword=storePassword;
            log.debug("key_password used is same as store_password");
        }
        if(keyStoreName == null) {
            initSymKey();
            initKeyPair();
        }
        else
            initConfiguredKey();

        if(cipher_pool_size <= 0) {
            log.warn("cipher_pool_size of %d is invalid; setting it to 1", cipher_pool_size);
            cipher_pool_size=1;
        }

        int tmp=Util.getNextHigherPowerOfTwo(cipher_pool_size);
        if(tmp != cipher_pool_size) {
            log.warn("setting cipher_pool_size (%d) to %d (power of 2) for faster modulo operation", cipher_pool_size, tmp);
            cipher_pool_size=tmp;
        }

        encoding_ciphers=new Cipher[cipher_pool_size];
        encoding_locks=new Lock[cipher_pool_size];
        decoding_ciphers=new Cipher[cipher_pool_size];
        decoding_locks=new Lock[cipher_pool_size];

        initSymCiphers(symAlgorithm, getSecretKey());
    }

    /**
     * Initialisation if a supplied key is defined in the properties. This
     * supplied key must be in a keystore which can be generated using the
     * keystoreGenerator file in demos. The keystore must be on the classpath to
     * find it.
     * 
     * @throws KeyStoreException
     * @throws Exception
     * @throws IOException
     * @throws NoSuchAlgorithmException
     * @throws CertificateException
     * @throws UnrecoverableKeyException
     */
    private void initConfiguredKey() throws Exception {
        InputStream inputStream=null;
        // must not use default keystore type - as does not support secret keys
        KeyStore store=KeyStore.getInstance("JCEKS");

        SecretKey tempKey=null;
        try {
            // load in keystore using this thread's classloader
            inputStream=Thread.currentThread()
                              .getContextClassLoader()
                              .getResourceAsStream(keyStoreName);
            if(inputStream == null)
                inputStream=new FileInputStream(keyStoreName);
            // we can't find a keystore here -
            if(inputStream == null) {
                throw new Exception("Unable to load keystore " + keyStoreName
                                    + " ensure file is on classpath");
            }
            // we have located a file lets load the keystore
            try {
                store.load(inputStream, storePassword.toCharArray());
                // loaded keystore - get the key
                tempKey=(SecretKey)store.getKey(alias, keyPassword.toCharArray());
            }
            catch(IOException e) {
                throw new Exception("Unable to load keystore " + keyStoreName + ": " + e);
            }
            catch(NoSuchAlgorithmException e) {
                throw new Exception("No Such algorithm " + keyStoreName + ": " + e);
            }
            catch(CertificateException e) {
                throw new Exception("Certificate exception " + keyStoreName + ": " + e);
            }

            if(tempKey == null)
                throw new Exception("Unable to retrieve key '" + alias + "' from keystore " + keyStoreName);
            //set the key here
            setSecretKey(tempKey);

            if(symAlgorithm.equals(DEFAULT_SYM_ALGO))
                symAlgorithm=tempKey.getAlgorithm();

            // set the fact we are using a supplied key
            suppliedKey=true;
            queue_down=queue_up=false;
        }
        finally {
            Util.close(inputStream);
        }

    }

    /**
     * Used to initialise the symmetric key if none is supplied in a keystore.
     * 
     * @throws Exception
     */
    public void initSymKey() throws Exception {
        KeyGenerator keyGen=null;
        // see if we have a provider specified
        if(symProvider != null && !symProvider.trim().isEmpty())
            keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm), symProvider);
        else
            keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm));
        // generate the key using the defined init properties
        keyGen.init(symInit);
        secretKey=keyGen.generateKey();

        setSecretKey(secretKey);
        log.debug("symmetric key generated ");
    }

    /**
     * Initialises the Ciphers for both encryption and decryption using the
     * generated or supplied secret key.
     * 
     * @param algorithm
     * @param secret
     * @throws Exception
     */
    private void initSymCiphers(String algorithm, SecretKey secret) throws Exception {
        log.debug("initializing symmetric ciphers (pool size=%d)",cipher_pool_size);

        for(int i=0; i < cipher_pool_size; i++) {
            encoding_ciphers[i]=symProvider != null && !symProvider.trim().isEmpty()?
              Cipher.getInstance(algorithm, symProvider) : Cipher.getInstance(algorithm);
            encoding_ciphers[i].init(Cipher.ENCRYPT_MODE, secret);

            decoding_ciphers[i]=symProvider != null && !symProvider.trim().isEmpty()?
              Cipher.getInstance(algorithm, symProvider) : Cipher.getInstance(algorithm);
            decoding_ciphers[i].init(Cipher.DECRYPT_MODE, secret);

            encoding_locks[i]=new ReentrantLock();
            decoding_locks[i]=new ReentrantLock();
        }


        //set the version
        MessageDigest digest=MessageDigest.getInstance("MD5");
        digest.reset();
        digest.update(secret.getEncoded());

        byte[] tmp=digest.digest();
        symVersion=Arrays.copyOf(tmp, tmp.length);
        // symVersion = byteArrayToHexString(digest.digest());
        log.debug("initialized symmetric ciphers with secret key (" + symVersion.length + " bytes)");
    }

   /* public static String byteArrayToHexString(byte[] b){
        StringBuilder sb = new StringBuilder(b.length * 2);
        for (int i = 0; i < b.length; i++){
            int v = b[i] & 0xff;
            if (v < 16) { sb.append('0'); }
            sb.append(Integer.toHexString(v));
        }
        return sb.toString().toUpperCase();
    }*/

    /**
     * Generates the public/private key pair from the init params
     * 
     * @throws Exception
     */
    public void initKeyPair() throws Exception {
        // generate keys according to the specified algorithms
        // generate publicKey and Private Key
        KeyPairGenerator KpairGen=null;
        if(asymProvider != null && !asymProvider.trim().isEmpty())
            KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm), asymProvider);
        else
            KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm));
        KpairGen.initialize(asymInit,new SecureRandom());
        Kpair=KpairGen.generateKeyPair();

        // set up the Cipher to decrypt secret key responses encrypted with our key

        if(asymProvider != null && !asymProvider.trim().isEmpty())
            asymCipher=Cipher.getInstance(asymAlgorithm, asymProvider);
        else
            asymCipher=Cipher.getInstance(asymAlgorithm);

        asymCipher.init(Cipher.DECRYPT_MODE,Kpair.getPrivate());
        log.debug("asym algo initialized");
    }


    public Object up(Event evt) {
        switch(evt.getType()) {
            case Event.VIEW_CHANGE:
                View view=(View)evt.getArg();
                log.debug("new view: " + view);
                if(!suppliedKey)
                    handleViewChange(view, false);
                break;
            case Event.TMP_VIEW:
                view=(View)evt.getArg();
                if(!suppliedKey)
                    handleViewChange(view, true);
                break;
            // we try and decrypt all messages
            case Event.MSG:
                try {
                    return handleUpMessage(evt);
                }
                catch(Exception e) {
                    log.warn("exception occurred decrypting message", e);
                }
                return null;
        }
        return up_prot.up(evt);
    }


    public void up(MessageBatch batch) {
        Decrypter decrypter=new Decrypter();
        batch.map(decrypter);
        decrypter.unlock();
        if(!batch.isEmpty())
            up_prot.up(batch);
    }



    private synchronized void handleViewChange(View view, boolean makeServer) {
    	if(makeServer)
    		initializeNewSymmetricKey(view instanceof MergeView);

        // if view is a bit broken set me as keyserver
        List<Address> members = view.getMembers();
        if (members == null || members.isEmpty() || members.get(0) == null) { 
            becomeKeyServer(local_addr, false);
            return;
        }
        // otherwise get keyserver from view controller
        Address tmpKeyServer=view.getMembers().get(0);

        //I am  keyserver - either first member of group or old key server is no more and
        // I have been voted new controller
        if(makeServer || (tmpKeyServer.equals(local_addr)))
            becomeKeyServer(tmpKeyServer, makeServer);
        else
            handleNewKeyServer(tmpKeyServer, view instanceof MergeView);
    }

	private void initializeNewSymmetricKey(boolean merge_view) {
		try {
			if ( changeKeysOnViewChange || !keyServer || merge_view) {
                log.debug("initalizing new ciphers");
				initSymKey();
				initSymCiphers(getSymAlgorithm(), getSecretKey());
			}

		} catch (Exception e) {
			log.error("could not initialize new ciphers", e);
			if ( e instanceof RuntimeException) {
				throw (RuntimeException)e;
			} else {
				throw new IllegalStateException(e);
			}
		}
	}

    /**
     * Handles becoming server - resetting queue settings and setting keyserver
     * address to be local address.
     * 
     * @param tmpKeyServer
     */
    private void becomeKeyServer(Address tmpKeyServer, boolean forced) {
        keyServerAddr=tmpKeyServer;
        keyServer=true;
        if(log.isDebugEnabled() && !forced)
            log.debug("%s: I have become the new key server", local_addr);
        queue_down=false;
        queue_up=false;
    }

    /**
     * Sets up the peer for a new keyserver - this is setting queueing to buffer
     * messages until we have a new secret key from the key server and sending a
     * key request to the new keyserver.
     * 
     * @param newKeyServer
     */
    private void handleNewKeyServer(Address newKeyServer, boolean merge_view) {
    	
    	if ( changeKeysOnViewChange || keyServerChanged(newKeyServer) || merge_view) {
            // start queueing until we have new key
            // to make sure we are not sending with old key
            queue_up=true;
            queue_down=true;
            // set new keyserver address
            keyServerAddr=newKeyServer;
            keyServer=false;
            log.debug("%s: %s has become the new key server, sending key request to it", local_addr, keyServerAddr);
            sendKeyRequest();
    	}
    }

	private boolean keyServerChanged(Address newKeyServer) {
		return keyServerAddr == null || !keyServerAddr.equals(newKeyServer);
	}


    private Object handleUpMessage(Event evt) throws Exception {
        Message msg=(Message)evt.getArg();
        EncryptHeader hdr;
        if(msg == null || (msg.getLength() == 0 && !encrypt_entire_message) || ((hdr=(EncryptHeader)msg.getHeader(this.id)) == null))
            return up_prot.up(evt);

        if(log.isTraceEnabled())
            log.trace("header received %s", hdr);

        switch(hdr.getType()) {
            case EncryptHeader.ENCRYPT:
                return handleEncryptedMessage(msg, evt, hdr);
            default:
                handleUpEvent(msg,hdr);
                return null;
        }
    }



    protected Object handleEncryptedMessage(Message msg, Event evt, EncryptHeader hdr) throws Exception {
        // if queueing then pass into queue to be dealt with later
        if(queue_up) {
            log.trace("queueing up message as no session key established: %s", msg);
            upMessageQueue.put(msg);
            return null;
        }

        // make sure we pass up any queued messages first
        // could be more optimised but this can wait we only need this if not using supplied key
        if(!suppliedKey)
            drainUpQueue();

        // try and decrypt the message - we need to copy msg as we modify its
        // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
        Message tmpMsg=decryptMessage(null, msg.copy()); // need to copy for possible xmits
        if(tmpMsg != null)
            return up_prot.up(new Event(Event.MSG, tmpMsg));
        log.warn("unrecognised cipher; discarding message");
        return null;
    }

    protected void handleUpEvent(Message msg, EncryptHeader hdr) {
        // check if we had some sort of encrypt control header if using supplied key we should not process it
        if(suppliedKey) {
            log.warn("we received an encrypt header of %s while in configured mode",hdr.getType());
            return;
        }

        // see what sort of encrypt control message we have received
        switch(hdr.getType()) {
            // if a key request
            case EncryptHeader.KEY_REQUEST:
                log.debug("received a key request from peer %s", msg.getSrc());

                // if a key request send response key back
                try {
                    // extract peer's public key
                    PublicKey tmpKey=generatePubKey(msg.getBuffer());
                    // send back the secret key we have
                    sendSecretKey(getSecretKey(), tmpKey, msg.getSrc());
                }
                catch(Exception e) {
                    log.warn("unable to reconstitute peer's public key");
                }
                break;
            case EncryptHeader.SECRETKEY:
                log.debug("received a secretkey response from keyserver %s", msg.getSrc());

                try {
                    SecretKey tmp=decodeKey(msg.getBuffer());
                    if(tmp == null)
                        sendKeyRequest(); // unable to understand response, let's try again
                    else {
                        // otherwise lets set the returned key as the shared key
                        setKeys(tmp, hdr.getVersion());
                        log.debug("decoded secretkey response");
                    }
                }
                catch(Exception e) {
                    log.warn("unable to process received public key", e);
                }
                break;
            default:
                log.warn("received ignored encrypt header of %s", hdr.getType());
                break;
        }
    }


    /**
     * used to drain the up queue - synchronized so we can call it safely
     * despite access from potentially two threads at once
     */
    private void drainUpQueue() {
        if(log.isTraceEnabled()) {
            int size=upMessageQueue.size();
            if(size > 0)
                log.trace("draining %d messages from the up queue", size);
        }
        while(true) {
            try {
                Message tmp=upMessageQueue.poll(0L, TimeUnit.MILLISECONDS);
                if(tmp == null)
                    break;
                Message msg=decryptMessage(null, tmp.copy()); // need to copy for possible xmits
                if(msg != null)
                    up_prot.up(new Event(Event.MSG, msg));
            }
            catch(Throwable t) {
                log.error("failed decrypting and sending message up when draining queue", t);
            }
        }
    }


    private void drainDownQueue() {
        if(log.isTraceEnabled()) {
            int size=downMessageQueue.size();
            if(size > 0)
                log.trace("draining %d messages from the down queue", size);
        }
        while(true) {
            try {
                Message tmp=downMessageQueue.poll(0L, TimeUnit.MILLISECONDS);
                if(tmp == null)
                    break;
                encryptAndSend(tmp);
            }
            catch(Throwable t) {
                log.error("failed sending message down when draining queue", t);
            }
        }
    }


    /**
     * Sets the keys for the app. and drains the queues - the drains could be
     * called att he same time as the up/down messages calling in to the class
     * so we may have an extra call to the drain methods but this slight expense
     * is better than the alternative of waiting until the next message to
     * trigger the drains which may never happen.
     * 
     * @param key
     * @param version
     * @throws Exception
     */
    private void setKeys(SecretKey key, byte[] version) throws Exception {

        // put the previous key into the map
        // if the keys are already there then they will overwrite
        keyMap.put(new AsciiString(getSymVersion()), getSymDecodingCipher());

        setSecretKey(key);
        initSymCiphers(key.getAlgorithm(), key);
        setSymVersion(version);

        // drain the up queue
        log.debug("setting queue up to false in setKeys");
        queue_up=false;
        drainUpQueue();

        queue_down=false;
        drainDownQueue();
    }

    /**
     * Does the actual work for decrypting - if version does not match current cipher then tries the previous cipher
     */
    private Message decryptMessage(Cipher cipher, Message msg) throws Exception {
        EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
        if(!Arrays.equals(hdr.getVersion(),getSymVersion())) {
            log.warn("attempting to use stored cipher as message does not use current encryption version ");
            cipher=keyMap.get(new AsciiString(hdr.getVersion()));
            if(cipher == null) {
                log.warn("unable to find a matching cipher in previous key map");
                return null;
            }
            log.trace("decrypting using previous cipher version");
            synchronized(cipher) {
                return _decrypt(cipher, msg, hdr.encryptEntireMessage());
            }
        }

        return _decrypt(cipher, msg, hdr.encryptEntireMessage());
    }

    private Message _decrypt(final Cipher cipher, Message msg, boolean decrypt_entire_msg) throws Exception {
        byte[] decrypted_msg;
        if(cipher == null)
            decrypted_msg=code(msg.getRawBuffer(), msg.getOffset(), msg.getLength(), true);
        else
            decrypted_msg=cipher.doFinal(msg.getRawBuffer(), msg.getOffset(), msg.getLength());

        if(!decrypt_entire_msg) {
            msg.setBuffer(decrypted_msg);
            return msg;
        }

        Message ret=Util.streamableFromBuffer(Message.class,decrypted_msg,0,decrypted_msg.length);
        if(ret.getDest() == null)
            ret.setDest(msg.getDest());
        if(ret.getSrc() == null)
            ret.setSrc(msg.getSrc());
        return ret;
    }

    private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) throws Exception {
        // create a cipher with peer's public key
        Cipher tmp;
        if (asymProvider != null && !asymProvider.trim().isEmpty())
            tmp=Cipher.getInstance(asymAlgorithm, asymProvider);
        else
            tmp=Cipher.getInstance(asymAlgorithm);
        tmp.init(Cipher.ENCRYPT_MODE,pubKey);

        //encrypt current secret key
        byte[] encryptedKey=tmp.doFinal(secret.getEncoded());
        Message newMsg=new Message(source, local_addr, encryptedKey)
          .putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion()));

        log.debug("sending version %s encoded key to client", new String(getSymVersion()));
        down_prot.down(new Event(Event.MSG,newMsg));
    }



    /** send client's public key to server and request server's public key */
    private void sendKeyRequest() {
        Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded())
          .putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion()));
        down_prot.down(new Event(Event.MSG,newMsg));
    }

    public Object down(Event evt) {
        switch(evt.getType()) {

            case Event.MSG:
                Message msg=(Message)evt.getArg();
                if(msg.getLength() == 0 && !encrypt_entire_message)
                    break;

                try {
                    if(queue_down) {
                        log.trace("queueing down message as no session key established: %s", msg);
                        downMessageQueue.put(msg); // queue messages if we are waiting for a new key
                    }
                    else {
                        // make sure the down queue is drained first to keep ordering
                        if(!suppliedKey)
                            drainDownQueue();
                        encryptAndSend(msg);
                    }
                }
                catch(Exception e) {
                    log.warn("unable to send message down", e);
                }
                return null;

            case Event.VIEW_CHANGE:
                View view=(View)evt.getArg();
                log.debug("new view: " + view);
                if(!suppliedKey)
                    handleViewChange(view, false);
                break;

            case Event.SET_LOCAL_ADDRESS:
                local_addr=(Address)evt.getArg();
                log.debug("set local address to %s", local_addr);
                break;

            case Event.TMP_VIEW:
                view=(View)evt.getArg();
                if(!suppliedKey) {
                    // if a tmp_view then we are trying to become coordinator so
                    // make us keyserver
                    handleViewChange(view, true);
                }
                break;
        }
        return down_prot.down(evt);
    }





    private void encryptAndSend(Message msg) throws Exception {
        EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, getSymVersion());
        if(this.encrypt_entire_message)
            hdr.type|=EncryptHeader.ENCRYPT_ENTIRE_MSG;

        if(encrypt_entire_message) {
            if(msg.getSrc() == null)
                msg.setSrc(local_addr);

            Buffer serialized_msg=Util.streamableToBuffer(msg);
            byte[] encrypted_msg=code(serialized_msg.getBuf(),serialized_msg.getOffset(),serialized_msg.getLength(),false);

            // exclude existing headers, they will be seen again when we decrypt and unmarshal the msg at the receiver
            Message tmp=msg.copy(false, false).setBuffer(encrypted_msg).putHeader(this.id,hdr);
            down_prot.down(new Event(Event.MSG, tmp));
            return;
        }

        // copy neeeded because same message (object) may be retransmitted -> no double encryption
        Message msgEncrypted=msg.copy(false).putHeader(this.id, hdr)
          .setBuffer(code(msg.getRawBuffer(),msg.getOffset(),msg.getLength(),false));
        down_prot.down(new Event(Event.MSG,msgEncrypted));
    }


    private byte[] code(byte[] buf, int offset, int length, boolean decode) throws Exception {
        int index=getNextIndex();
        Lock lock=decode? decoding_locks[index] : encoding_locks[index];
        Cipher cipher=decode? decoding_ciphers[index] : encoding_ciphers[index];

        lock.lock();
        try {
            return cipher.doFinal(buf, offset, length);
        }
        finally {
            lock.unlock();
        }
    }


    // try and decode secrey key sent from keyserver
    private SecretKeySpec decodeKey(byte[] encodedKey) throws Exception {
        byte[] keyBytes;

        synchronized(this) {
            keyBytes=asymCipher.doFinal(encodedKey);
        }

        try {
            SecretKeySpec keySpec=new SecretKeySpec(keyBytes, getAlgorithm(symAlgorithm));

            // test reconstituted key to see if valid
            Cipher temp;
            if (symProvider != null && !symProvider.trim().isEmpty())
                temp=Cipher.getInstance(symAlgorithm, symProvider);
            else
                temp=Cipher.getInstance(symAlgorithm);
            temp.init(Cipher.SECRET_KEY, keySpec);
            return keySpec;
        }
        catch(Exception e) {
            log.error("failed decoding key", e);
            return null;
        }
    }

    /**
     * used to reconstitute public key sent in byte form from peer
     * 
     * @param encodedKey
     * @return PublicKey
     */
    private PublicKey generatePubKey(byte[] encodedKey) {
        PublicKey pubKey=null;
        try {
            KeyFactory KeyFac=KeyFactory.getInstance(getAlgorithm(asymAlgorithm));
            X509EncodedKeySpec x509KeySpec=new X509EncodedKeySpec(encodedKey);
            pubKey=KeyFac.generatePublic(x509KeySpec);
        }
        catch(Exception e) {
            e.printStackTrace();
        }
        return pubKey;
    }




    /** Decrypts all messages in a batch, replacing encrypted messages in-place with their decrypted versions */
    protected class Decrypter implements MessageBatch.Visitor<Message> {
        protected Lock   lock;
        protected Cipher cipher;

        public Message visit(Message msg, MessageBatch batch) {
            EncryptHeader hdr;

            if(msg == null || (msg.getLength() == 0 && !encrypt_entire_message) || ((hdr=(EncryptHeader)msg.getHeader(id)) == null))
                return null;

            if(hdr.getType() == EncryptHeader.ENCRYPT) {
                // if queueing then pass into queue to be dealt with later
                if(queue_up) {
                    queueUpMessage(msg, batch);
                    return null;
                }

                // make sure we pass up any queued messages first
                if(!suppliedKey)
                    drainUpQueue();

                if(lock == null) {
                    int index=getNextIndex();
                    lock=decoding_locks[index];
                    cipher=decoding_ciphers[index];
                    lock.lock();
                }

                try {
                    Message tmpMsg=decryptMessage(cipher, msg.copy()); // need to copy for possible xmits
                    if(tmpMsg != null)
                        batch.replace(msg, tmpMsg);
                }
                catch(Exception e) {
                    log.error("failed decrypting message from %s (offset=%d, length=%d, buf.length=%d): %s, headers are %s",
                              msg.getSrc(), msg.getOffset(), msg.getLength(), msg.getRawBuffer().length, e, msg.printHeaders());
                }
            }
            else {
                batch.remove(msg); // a control message will get handled by ENCRYPT and should not be passed up
                handleUpEvent(msg, hdr);
            }
            return null;
        }

        protected void unlock() {
            if(lock != null) {
                lock.unlock();
                lock=null;
            }
        }

        protected void queueUpMessage(Message msg, MessageBatch batch) {
            log.trace("queueing up message as no session key established: " + msg);
            try {
                upMessageQueue.put(msg);
                batch.remove(msg);
            }
            catch(InterruptedException e) {
            }
        }
    }


    public static class EncryptHeader extends org.jgroups.Header {
        public static final byte ENCRYPT            = 1 << 0;
        public static final byte KEY_REQUEST        = 1 << 1;
        public static final byte SECRETKEY          = 1 << 2;
        public static final byte ENCRYPT_ENTIRE_MSG = 1 << 3;

        private   byte   type;
        protected byte[] version;


        public EncryptHeader() {}


        public EncryptHeader(byte type, byte[] version) {
            this.type=type;
            this.version=version;
            if(version == null)
                throw new IllegalArgumentException("version must be defined");
        }

        public byte getType() {
            return (byte)(type & ~ENCRYPT_ENTIRE_MSG); // clear the ENCRYPT_ENTIRE_MSG flag
        }

        /**
         * @return Returns the version.
         */
        protected byte[] getVersion() {
            return version;
        }

        public boolean encryptEntireMessage() {
            return Util.isFlagSet(type, ENCRYPT_ENTIRE_MSG);
        }

        public void writeTo(DataOutput out) throws Exception {
            out.writeByte(type);
            out.writeShort(version.length);
            out.write(version);
        }

        public void readFrom(DataInput in) throws Exception {
            type=in.readByte();
            short len=in.readShort();
            version=new byte[len];
            in.readFully(version);
        }

        public String toString() {
            return "[type=" + type + " version=\"" + (version != null? version.length + " bytes" : "n/a") + "\"]";
        }

        public int size() {
            int retval=Global.BYTE_SIZE + Global.SHORT_SIZE;
            retval+=version.length;
            return retval;
        }




    }
}
